home *** CD-ROM | disk | FTP | other *** search
- /* ----------------------------------------------------------------
- * FILE
- * slaves.c
- *
- * DESCRIPTION
- * slave backend management routines
- *
- * INTERFACE ROUTINES
- * SlaveMain()
- * SlaveBackendsInit()
- * SlaveBackendsAbort()
- *
- * NOTES
- *
- * IDENTIFICATION
- * $Header: /private/postgres/src/tcop/RCS/slaves.c,v 1.24 1992/07/06 05:03:08 mao Exp $
- * ----------------------------------------------------------------
- */
- #include <signal.h>
- #include <setjmp.h>
-
- #include "tmp/postgres.h"
-
- RcsId("$Header: /private/postgres/src/tcop/RCS/slaves.c,v 1.24 1992/07/06 05:03:08 mao Exp $");
-
- /* ----------------
- * FILE INCLUDE ORDER GUIDELINES
- *
- * 1) tcopdebug.h
- * 2) various support files ("everything else")
- * 3) node files
- * 4) catalog/ files
- * 5) execdefs.h and execmisc.h, if necessary.
- * 6) extern files come last.
- * ----------------
- */
- #include "tcop/tcopdebug.h"
- #include "nodes/plannodes.h"
- #include "nodes/plannodes.a.h"
- #include "nodes/execnodes.h"
- #include "executor/execdesc.h"
- #include "tcop/dest.h"
- #include "tcop/pquery.h"
- #include "tcop/slaves.h"
-
- #include "access/xact.h"
- #include "utils/log.h"
- #include "catalog/syscache.h"
-
- /* ----------------
- * parallel state variables
- * ----------------
- */
- /*
- * local data structures
- */
- extern int MyPid; /* int representing the process id, defined in execipc.c */
- static ProcessNode *SlaveArray, *FreeSlaveP;
- int NumberOfFreeSlaves;
- ProcGroupLocalInfo ProcGroupLocalInfoP; /* process group local info */
- static ProcGroupLocalInfo FreeProcGroupP;
- extern SlaveLocalInfoData SlaveLocalInfoD; /* defined in execipc.c */
- extern int AdjustParallelismEnabled;
- FILE *StatFp;
- static bool RestartForParAdj = false; /* indicating that the longjmp to
- SlaveRestartPoint is for paradj */
- static List QueryDesc;
-
- /*
- * shared data structures
- */
- int *MasterProcessIdP; /* master backend process id */
- int *SlaveAbortFlagP; /* flag set during a transaction abort */
- extern SlaveInfo SlaveInfoP; /* slave backend info */
- extern ProcGroupInfo ProcGroupInfoP; /* process group info */
- /* defined in execipc.c to make
- postmaster happy */
-
- TransactionState SharedTransactionState; /* current transaction info */
-
- #define CONDITION_NORMAL 0
- #define CONDITION_ABORT 1
-
- /* --------------------------------
- * SendAbortSignals
- *
- * This sends a SIGHUP to every other backend in order
- * to cause them to preform their abort processing when
- * we discover a reason to abort the current transaction.
- * --------------------------------
- */
- void
- SendAbortSignals()
- {
- int nslaves; /* number of slaves */
- int i; /* counter */
- int p; /* process id */
-
- #ifdef TCOP_SLAVESYNCDEBUG
- if (IsMaster)
- elog(DEBUG, "Master Backend sending abort signals");
- else
- elog(DEBUG, "Slave Backend %d sending abort signals", MyPid);
- #endif TCOP_SLAVESYNCDEBUG
-
- nslaves = GetNumberSlaveBackends();
- for (i=0; i<nslaves; i++)
- if (i != MyPid) {
- p = SlaveInfoP[i].unixPid;
- if (kill(p, SIGHUP) != 0) {
- fprintf(stderr, "signaling slave %d (pid %d): ", i, p);
- perror("kill");
- }
- }
-
- if (! IsMaster) {
- p = (*MasterProcessIdP);
- if (kill(p, SIGHUP) != 0) {
- fprintf(stderr, "signaling master (pid %d): ", p);
- perror("kill");
- }
- }
- }
-
- /* --------------------------------
- * SlaveRestart
- *
- * This is the signal / exception handler for the slave
- * backends. It causes processing to resume at the top
- * of SlaveMain(), right before SlaveBackendsAbort().
- * --------------------------------
- */
- jmp_buf SlaveRestartPoint;
- int SlaveWarnings = 0;
-
- void
- SlaveRestart()
- {
- SLAVE1_elog(DEBUG, "Slave Backend %d *** SlaveRestart ***", MyPid);
-
- longjmp(SlaveRestartPoint, 1);
- }
-
- /* --------------------------------
- * SlaveBackendsAbort
- *
- * this is called in each process when trouble arises.
- * --------------------------------
- */
- void
- SlaveBackendsAbort()
- {
- int nslaves; /* number of slaves */
- int i; /* counter */
-
- /* ----------------
- * if the abort flag is not set, then it means the problem
- * occurred within our process. So set the flag and tell
- * all the other backends to abort.
- *
- * only the aborting process does this:
- * ----------------
- */
- if ((*SlaveAbortFlagP) != CONDITION_ABORT) {
- (*SlaveAbortFlagP) = CONDITION_ABORT;
-
- SendAbortSignals();
- }
-
- /* ----------------
- * all processes do this:
- * ----------------
- */
- if (IsMaster) {
- /* ----------------
- * in the master:
- *
- * + we record the aborted transaction,
- * + reinitialize the synchronization semaphores
- * + reinitialize the executor shared memory
- * + signal the slave backends to cleanup.
- *
- * We then wait for all the slaves to finish doing their
- * cleanup and then clear the abort flag.
- * ----------------
- */
- SLAVE_elog(DEBUG, "Master Backend aborting current transaction");
- AbortCurrentTransaction();
-
- SLAVE_elog(DEBUG, "Master Backend reinitializing semaphores");
- nslaves = GetNumberSlaveBackends();
- for (i=0; i<nslaves; i++) {
- I_Start(i);
- }
- I_Finished();
-
- SLAVE_elog(DEBUG, "Master Backend reinitializing shared memory");
- ExecSMInit();
-
- SLAVE_elog(DEBUG, "Master Backend signaling slaves abort");
- V_Abort();
-
- SLAVE_elog(DEBUG, "Master Backend waiting for slave aborts");
- P_FinishedAbort();
-
- SLAVE_elog(DEBUG, "Master Backend reinitializing abort semaphore");
- I_Abort();
-
- SLAVE_elog(DEBUG, "Master Backend abort processing finished");
- (*SlaveAbortFlagP) = CONDITION_NORMAL;
-
- } else {
- /* ----------------
- * the slave backends preform their cleanup processing
- * after the master backend records the abort. This is
- * guaranteed because the slaves P on the semaphore which
- * isn't V'ed by the master until after its done aborting.
- * ----------------
- */
- SLAVE1_elog(DEBUG, "Slave Backend %d waiting to abort..",
- MyPid);
- P_Abort();
-
- SLAVE1_elog(DEBUG, "Slave Backend %d processing abort..",
- MyPid);
- SlaveAbortTransaction();
-
- SLAVE1_elog(DEBUG, "Slave Backend %d abort finished, signaling..",
- MyPid);
-
- V_Abort();
- V_FinishedAbort();
- }
- }
-
- /* --------------------------------
- * SlaveMain is the main loop executed by the slave backends
- * --------------------------------
- */
- void
- SlaveMain()
- {
- int i;
- int groupid;
- extern int ShowExecutorStats;
-
- /* -----------------
- * set the flag to false in case the SIGPARADJ is received
- * -----------------
- */
- SlaveLocalInfoD.isworking = false;
- /* ------------------
- * set stat file pointer if required
- * ------------------
- */
- if (ShowExecutorStats) {
- char fname[30];
- sprintf(fname, "/usr/tmp/hong/slave%d.stat", MyPid);
- StatFp = fopen(fname, "w");
- }
- /* ----------------
- * before we begin processing we have to register a SIGHUP
- * handler so that if a problem occurs in one slave backend,
- * all the backends resynchronize.
- * ----------------
- */
- if (setjmp(SlaveRestartPoint) != 0) {
- if (RestartForParAdj) {
- /* restart for parallelism adjustment */
- RestartForParAdj = false;
- }
- else {
- /* restart for transaction abort */
- SlaveWarnings++;
- SLAVE1_elog(DEBUG, "Slave Backend %d SlaveBackendsAbort()",
- MyPid);
-
- SlaveBackendsAbort();
- SLAVE1_elog(DEBUG, "Slave Backend %d SlaveBackendsAbort() done",
- MyPid);
- }
- }
-
- signal(SIGHUP, SlaveRestart);
-
- /* ----------------
- * POSTGRES slave processing loop begins here
- * ----------------
- */
- SLAVE1_elog(DEBUG, "Slave Backend %d entering slave loop...", MyPid);
-
- for(;;) {
- /* ----------------
- * setup caches, lock tables and memory stuff just
- * as if we were running within a transaction.
- * ----------------
- */
- SlaveStartTransaction();
-
- /* ----------------
- * The master V's on the slave start semaphores after
- * placing the plan fragment in shared memory.
- * Meanwhile each slave waits on its start semaphore
- * until signaled by the master.
- * ----------------
- */
- SLAVE1_elog(DEBUG, "Slave Backend %d waiting...", MyPid);
-
- P_Start(MyPid);
-
- SLAVE1_elog(DEBUG, "Slave Backend %d starting task.", MyPid);
-
- if (ShowExecutorStats)
- ResetUsage();
- /* ------------------
- * initialize slave local info
- * ------------------
- */
- groupid = SlaveInfoP[MyPid].groupId;
- SlaveLocalInfoD.startpage = SlaveInfoP[MyPid].groupPid +
- (SlaveInfoP[MyPid].isAddOnSlave ?
- ProcGroupInfoP[groupid].paradjpage : 0);
- SlaveLocalInfoD.nparallel = ProcGroupInfoP[groupid].nprocess;
- SlaveLocalInfoD.paradjpending = false;
- SlaveLocalInfoD.paradjpage = -1;
- SlaveLocalInfoD.newparallel = -1;
- SlaveLocalInfoD.heapscandesc = NULL;
- SlaveLocalInfoD.indexscandesc = NULL;
- SlaveLocalInfoD.isworking = true;
-
- /* ----------------
- * get the query descriptor to execute.
- * ----------------
- */
- QueryDesc = (List)CopyObject((List)ProcGroupInfoP[groupid].queryDesc);
-
- /* ----------------
- * process the query descriptor
- * ----------------
- */
- if (QueryDesc != NULL)
- ProcessQueryDesc(QueryDesc);
-
- /* ---------------
- * it is important to set isDone to true before
- * SlaveCommitTransaction(), because it may
- * free the memory of SlaveInfoP[MyPid].heapscandesc
- * ---------------
- */
- SlaveInfoP[MyPid].isDone = true;
-
- if (ShowExecutorStats)
- ShowUsage();
- /* ----------------
- * clean caches, lock tables and memory stuff just
- * as if we were running within a transaction.
- * ----------------
- */
- SlaveCommitTransaction();
-
- /* ----------------
- * when the slave finishes, it signals the master
- * backend by V'ing on the master's finished semaphore.
- *
- * Since the master started by placing nslaves locks
- * on the semaphore, the semaphore will be decremented
- * each time a slave finishes. the last slave to finish
- * should bring the finished count to 1;
- * ----------------
- */
- SLAVE1_elog(DEBUG, "Slave Backend %d task complete.", MyPid);
- SlaveLocalInfoD.isworking = false;
- V_Finished(groupid, &(ProcGroupInfoP[groupid].scounter), FINISHED);
- }
- }
-
- /* --------------------------------
- * MoveTransactionState copies the transaction system's
- * CurrentTransactionState information into permanent shared
- * memory and then sets the transaction system's state
- * pointer to the shared memory copy. This is necessary to
- * have the transaction system use the shared state in each
- * of the parallel backends.
- * --------------------------------
- */
- void
- MoveTransactionState()
- {
- int nbytes = sizeof(TransactionStateData);
- extern TransactionState CurrentTransactionState;
-
- bcopy(CurrentTransactionState, SharedTransactionState, nbytes);
- CurrentTransactionState = SharedTransactionState;
- }
-
- /* --------------------------------
- * SlaveBackendsInit
- *
- * SlaveBackendsInit initializes the communication structures,
- * forks several "worker" backends and returns.
- *
- * Note: this function only returns in the master
- * backend. The worker backends run forever in
- * a separate execution loop.
- * --------------------------------
- */
-
- void
- SlaveBackendsInit()
- {
- int nslaves; /* number of slaves */
- int i; /* counter */
- int p; /* process id returned by fork() */
- extern int ProcessAffinityOn; /* process affinity on flag */
-
- /* ----------------
- * first initialize shared memory and get the number of
- * slave backends.
- * ----------------
- */
- nslaves = GetNumberSlaveBackends();
-
- /* -----------------
- * the following calls to SearchSysCacheTuple() are total hacks
- * what they do is to pre-initialize all the caches so that
- * performance numbers will look better. they can be removed at
- * any time.
- * -----------------
- */
- SearchSysCacheTuple(AMOPOPID, NULL, NULL, NULL, NULL);
- SearchSysCacheTuple(AMOPSTRATEGY, NULL, NULL, NULL, NULL);
- SearchSysCacheTuple(ATTNAME, "", NULL, NULL, NULL);
- SearchSysCacheTuple(ATTNUM, NULL, NULL, NULL, NULL);
- SearchSysCacheTuple(INDEXRELID, NULL, NULL, NULL, NULL);
- SearchSysCacheTuple(LANNAME, "", NULL, NULL, NULL);
- SearchSysCacheTuple(OPRNAME, "", NULL, NULL, NULL);
- SearchSysCacheTuple(OPROID, NULL, NULL, NULL, NULL);
- SearchSysCacheTuple(PRONAME, "", NULL, NULL, NULL);
- SearchSysCacheTuple(PROOID, NULL, NULL, NULL, NULL);
- SearchSysCacheTuple(RELNAME, "", NULL, NULL, NULL);
- SearchSysCacheTuple(RELOID, NULL, NULL, NULL, NULL);
- SearchSysCacheTuple(TYPNAME, "", NULL, NULL, NULL);
- SearchSysCacheTuple(TYPOID, NULL, NULL, NULL, NULL);
- SearchSysCacheTuple(AMNAME, "", NULL, NULL, NULL);
- SearchSysCacheTuple(CLANAME, "", NULL, NULL, NULL);
- SearchSysCacheTuple(INDRELIDKEY, NULL, NULL, NULL, NULL);
- SearchSysCacheTuple(INHRELID, NULL, NULL, NULL, NULL);
- SearchSysCacheTuple(PRS2PLANCODE, NULL, NULL, NULL, NULL);
- SearchSysCacheTuple(RULOID, NULL, NULL, NULL, NULL);
- SearchSysCacheTuple(PRS2STUB, NULL, NULL, NULL, NULL);
-
- /* --------------------
- * set signal for dynamically adjusting degrees of parallelism
- * --------------------
- */
- if (AdjustParallelismEnabled)
- signal(SIGPARADJ, paradj_handler);
-
- /* ----------------
- * initialize Start, Finished, and Abort semaphores
- * ----------------
- */
- for (i=0; i<nslaves; i++)
- I_Start(i);
- I_Finished();
- I_Abort();
-
- /* ----------------
- * allocate area in shared memory for process ids and
- * communication mechanisms. All backends share these pointers.
- * ----------------
- */
- MasterProcessIdP = (int*)ExecSMReserve(sizeof(int));
- SlaveAbortFlagP = (int*)ExecSMReserve(sizeof(int));
- SlaveInfoP = (SlaveInfo)ExecSMReserve(nslaves * sizeof(SlaveInfoData));
- ProcGroupInfoP = (ProcGroupInfo)ExecSMReserve(nslaves *
- sizeof(ProcGroupInfoData));
- SharedTransactionState = (TransactionState)ExecSMReserve(
- sizeof(TransactionStateData));
-
- /* ----------------
- * move the transaction system state data into shared memory
- * before we fork so that all backends share the transaction state.
- * ----------------
- */
- MoveTransactionState();
-
- /* ----------------
- * initialize executor shared memory
- * ----------------
- */
- ExecSMInit();
- /* ----------------
- * initialize shared memory variables
- * ----------------
- */
- (*MasterProcessIdP) = getpid();
- (*SlaveAbortFlagP) = CONDITION_NORMAL;
- /* ----------------
- * fork several slave processes and save the process id's
- * ----------------
- */
- MyPid = -1;
-
- for (i=0; i<nslaves; i++)
- if (IsMaster) {
- if ((p = fork()) != 0) {
- if (p < 0) {
- perror("fork");
- exitpg(1);
- }
- /* initialize shared data structures */
- SlaveInfoP[i].unixPid = p;
- SlaveInfoP[i].groupId = -1;
- SlaveInfoP[i].groupPid = -1;
- SlaveInfoP[i].resultTmpRelDesc = NULL;
- #ifdef sequent
- S_INIT_LOCK(&(SlaveInfoP[i].lock));
- S_LOCK(&(SlaveInfoP[i].lock));
- #endif
- ProcGroupInfoP[i].status = IDLE;
- ProcGroupInfoP[i].queryDesc = NULL;
- ProcGroupInfoP[i].scounter.count = 0;
- ProcGroupInfoP[i].dropoutcounter.count = 0;
- #ifdef sequent
- S_INIT_LOCK(&(ProcGroupInfoP[i].scounter.exlock));
- S_INIT_LOCK(&(ProcGroupInfoP[i].dropoutcounter.exlock));
- #endif
- ProcGroupInfoP[i].paradjpage = NULLPAGE;
- InitMWaitOneLock(&(ProcGroupInfoP[i].m1lock));
- } else {
- MyPid = i;
- #ifdef sequent
- if (ProcessAffinityOn && GetNumberSlaveBackends() <= 12) {
- int prev;
- /* ----------------------
- * bind slave to a specific processor
- * ----------------------
- */
- prev = tmp_affinity(MyPid);
- if (prev == -1) {
- fprintf(stderr, "slave %d ", MyPid);
- perror("tmp_affinity");
- }
- }
- #endif
- SLAVE1_elog(DEBUG, "Slave Backend %d alive!", MyPid);
- }
- }
-
- /* ----------------
- * now ExecIsMaster is true in the master backend and is false
- * in each of the slaves. In addition, each slave is branded
- * with a slave id to identify it. So, if we're a slave, we now
- * get sent off to the labor camp, never to return..
- * ----------------
- */
- if (IsMaster) {
- /* initialize local data structures of the master */
- SlaveArray = (ProcessNode*)malloc(nslaves * sizeof(ProcessNode));
- FreeSlaveP = SlaveArray;
- NumberOfFreeSlaves = nslaves;
- for (i=0; i<nslaves; i++) {
- SlaveArray[i].pid = i;
- SlaveArray[i].next = SlaveArray + i + 1;
- }
- SlaveArray[nslaves-1].next = NULL;
- ProcGroupLocalInfoP = (ProcGroupLocalInfo)malloc(nslaves *
- sizeof(ProcGroupLocalInfoData));
- FreeProcGroupP = ProcGroupLocalInfoP;
- for (i=0; i<nslaves; i++) {
- ProcGroupLocalInfoP[i].id = i;
- ProcGroupLocalInfoP[i].fragment = NULL;
- ProcGroupLocalInfoP[i].memberProc = NULL;
- ProcGroupLocalInfoP[i].nextfree = ProcGroupLocalInfoP + i + 1;
- ProcGroupLocalInfoP[i].resultTmpRelDescList = LispNil;
- }
- ProcGroupLocalInfoP[nslaves - 1].nextfree = NULL;
- }
- else {
- SlaveMain();
- }
-
- /* ----------------
- * here we're in the master and we've completed initialization
- * so we return to the read..parse..plan..execute loop.
- * ----------------
- */
- return;
- }
-
- /* ----------------------
- * getFreeSlave
- *
- * get a free slave backend from the FreeSlaveP queue
- * also decrements NumberOfFreeSlaves
- * ----------------------
- */
- int
- getFreeSlave()
- {
- ProcessNode *p;
-
- p = FreeSlaveP;
- FreeSlaveP = p->next;
- NumberOfFreeSlaves--;
-
- return p->pid;
- }
-
- /* ------------------------
- * freeSlave
- *
- * frees a slave to FreeSlaveP queue
- * increments NumberOfFreeSlaves
- * ------------------------
- */
- void
- freeSlave(i)
- int i;
- {
- SlaveArray[i].next = FreeSlaveP;
- FreeSlaveP = SlaveArray + i;
- SlaveInfoP[i].groupId = -1;
- SlaveInfoP[i].groupPid = -1;
- NumberOfFreeSlaves++;
- }
-
- /* ------------------------
- * getFreeProcGroup
- *
- * get a free process group with nproc free slave processes
- * ------------------------
- */
- int
- getFreeProcGroup(nproc)
- int nproc;
- {
- ProcGroupLocalInfo p;
- ProcessNode *slavep;
- int i;
- int pid;
-
- p = FreeProcGroupP;
- FreeProcGroupP = p->nextfree;
- pid = getFreeSlave();
- SlaveInfoP[pid].groupId = p->id;
- SlaveInfoP[pid].groupPid = 0;
- SlaveInfoP[pid].isAddOnSlave = false;
- SlaveInfoP[pid].isDone = false;
- p->memberProc = SlaveArray + pid;
- slavep = p->memberProc;
- for (i=1; i<nproc; i++) {
- pid = getFreeSlave();
- SlaveInfoP[pid].groupId = p->id;
- SlaveInfoP[pid].groupPid = i;
- SlaveInfoP[pid].isAddOnSlave = false;
- SlaveInfoP[pid].isDone = false;
- slavep->next = SlaveArray + pid;
- slavep = slavep->next;
- }
- slavep->next = NULL;
- return p->id;
- }
-
- /* --------------------------
- * addSlaveToProcGroup
- *
- * add a free slave to an existing process group
- * --------------------------
- */
- void
- addSlaveToProcGroup(slave, group, groupid)
- int slave;
- int group;
- int groupid;
- {
- SlaveInfoP[slave].groupId = group;
- SlaveInfoP[slave].groupPid = groupid;
- SlaveInfoP[slave].isAddOnSlave = true;
- SlaveArray[slave].next = ProcGroupLocalInfoP[group].memberProc;
- ProcGroupLocalInfoP[group].memberProc = SlaveArray + slave;
- }
-
- /* -------------------------
- * freeProcGroup
- *
- * frees a process group and all the slaves in the group
- * -------------------------
- */
- void
- freeProcGroup(gid)
- int gid;
- {
- ProcessNode *p, *nextp;
-
- p=ProcGroupLocalInfoP[gid].memberProc;
- while (p != NULL) {
- nextp = p->next;
- freeSlave(p->pid);
- p = nextp;
- }
- ProcGroupInfoP[gid].status = IDLE;
- ProcGroupLocalInfoP[gid].fragment = NULL;
- ProcGroupLocalInfoP[gid].nextfree = FreeProcGroupP;
- ProcGroupLocalInfoP[gid].resultTmpRelDescList = LispNil;
- FreeProcGroupP = ProcGroupLocalInfoP + gid;
- }
-
- /* ----------------------------
- * getFinishedProcGroup
- *
- * walks the array of processes group and find the first
- * process group with status = FINISHED or PARADJPENDING
- * -----------------------------
- */
- int
- getFinishedProcGroup()
- {
- int i;
-
- for (i=0; i<GetNumberSlaveBackends(); i++) {
- if (ProcGroupInfoP[i].status == FINISHED ||
- ProcGroupInfoP[i].status == PARADJPENDING)
- return i;
- }
- return -1;
- }
-
- /* -------------------------------
- * wakeupProcGroup
- *
- * wake up the processes in process group
- * -------------------------------
- */
- void
- wakeupProcGroup(groupid)
- int groupid;
- {
- ProcessNode *p;
-
- for (p = ProcGroupLocalInfoP[groupid].memberProc;
- p != NULL;
- p = p->next) {
- V_Start(p->pid);
- }
- }
-
- /* ---------------------------------
- * signalProcGroup
- *
- * send a signal to a process group
- * ---------------------------------
- */
- void
- signalProcGroup(groupid, sig)
- int groupid;
- int sig;
- {
- ProcessNode *p;
-
- for (p = ProcGroupLocalInfoP[groupid].memberProc;
- p != NULL;
- p = p->next) {
- kill(SlaveInfoP[p->pid].unixPid, sig);
- }
- }
-
- /* ------------------------------
- * the following routines are a specialized memory allocator for
- * the process groups. they only supposed to be called by the master
- * backend. no mutex is done.
- * ------------------------------
- */
- static char *CurrentSMSegmentStart;
- static char *CurrentSMSegmentEnd;
- static int CurrentSMGroupid;
- static char *CurrentSMSegmentPointer;
-
- /* --------------------------------
- * ProcGroupSMBeginAlloc
- *
- * begins shared memory allocation for process group
- * --------------------------------
- */
- void
- ProcGroupSMBeginAlloc(groupid)
- int groupid;
- {
- MemoryHeader mp;
-
- mp = ExecGetSMSegment();
- ProcGroupLocalInfoP[groupid].groupSMQueue = mp;
- mp->next = NULL;
- CurrentSMSegmentStart = mp->beginaddr;
- CurrentSMSegmentEnd = CurrentSMSegmentStart + mp->size;
- CurrentSMGroupid = groupid;
- CurrentSMSegmentPointer = CurrentSMSegmentStart;
- }
-
- /* -------------------------------
- * ProcGroupSMEndAlloc
- *
- * ends shared memory allocation for process group
- * frees the leftover memory from current memory segment
- * -------------------------------
- */
- void
- ProcGroupSMEndAlloc()
- {
- int usedsize;
- MemoryHeader mp;
-
- usedsize = CurrentSMSegmentPointer - CurrentSMSegmentStart;
- mp = ProcGroupLocalInfoP[CurrentSMGroupid].groupSMQueue;
- CurrentSMSegmentStart=CurrentSMSegmentPointer=CurrentSMSegmentEnd = NULL;
- ExecSMSegmentFreeUnused(mp, usedsize);
- }
-
- /* --------------------------------
- * ProcGroupSMAlloc
- *
- * allocate shared memory within a process group
- * if the current memory segment runs out, allocate a new segment
- * ---------------------------------
- */
- char *
- ProcGroupSMAlloc(size)
- int size;
- {
- MemoryHeader mp;
- char *retP;
-
- while (CurrentSMSegmentPointer + size > CurrentSMSegmentEnd) {
- mp = ExecGetSMSegment();
- if (mp == NULL)
- elog(WARN, "out of executor shared memory, got to die.");
- mp->next = ProcGroupLocalInfoP[CurrentSMGroupid].groupSMQueue;
- ProcGroupLocalInfoP[CurrentSMGroupid].groupSMQueue = mp;
- CurrentSMSegmentStart = mp->beginaddr;
- CurrentSMSegmentEnd = CurrentSMSegmentStart + mp->size;
- CurrentSMSegmentPointer = CurrentSMSegmentStart;
- }
- retP = CurrentSMSegmentPointer;
- CurrentSMSegmentPointer = (char*)LONGALIGN(CurrentSMSegmentPointer + size);
- return retP;
- }
-
- /* -------------------------------
- * ProcGroupSMClean
- *
- * frees the shared memory allocated for a process group
- * -------------------------------
- */
- void
- ProcGroupSMClean(groupid)
- int groupid;
- {
- MemoryHeader mp, nextp;
-
- mp = ProcGroupLocalInfoP[groupid].groupSMQueue;
- while (mp != NULL) {
- nextp = mp->next;
- ExecSMSegmentFree(mp);
- mp = nextp;
- }
- }
-
- /* -----------------------------
- * the following routines are special functions for copying reldescs to
- * shared memory
- * -----------------------------
- */
- static char *SlaveTmpRelDescMemoryP;
-
- /* -------------------------------
- * SlaveTmpRelDescInit
- *
- * initialize shared memory preallocated for temporary relation descriptors
- * -------------------------------
- */
- void
- SlaveTmpRelDescInit()
- {
- SlaveTmpRelDescMemoryP = (char*)SlaveInfoP[MyPid].resultTmpRelDesc;
- }
-
- /* -------------------------------
- * SlaveTmpRelDescAlloc
- *
- * memory allocation for reldesc copying
- * Note: there is no boundary checking, so had better pre-allocate
- * enough memory!
- * -------------------------------
- */
- char *
- SlaveTmpRelDescAlloc(size)
- int size;
- {
- char *retP;
-
- retP = SlaveTmpRelDescMemoryP;
- SlaveTmpRelDescMemoryP = (char*)LONGALIGN(SlaveTmpRelDescMemoryP + size);
-
- return retP;
- }
-
- /* ---------------------------------
- * getProcGroupMaxPage
- *
- * find out the largest page number the slaves are scanning
- * used only after SIGPARADJ signal has been sent to the
- * process group.
- * ---------------------------------
- */
- int
- getProcGroupMaxPage(groupid)
- int groupid;
- {
- ProcessNode *p;
- int maxpage = NULLPAGE;
- int page;
-
- for (p = ProcGroupLocalInfoP[groupid].memberProc;
- p != NULL;
- p = p->next) {
- #ifdef HAS_TEST_AND_SET
- S_LOCK(&(SlaveInfoP[p->pid].lock));
- #endif
- page = SlaveInfoP[p->pid].curpage;
- if (page == NOPARADJ)
- maxpage = NOPARADJ;
- if (maxpage < page && maxpage != NOPARADJ)
- maxpage = page;
- }
- return maxpage;
- }
-
- /* ---------------------------------------
- * paradj_handler
- *
- * signal handler for dynamically adjusting degrees of parallelism
- * XXX only handle heap scan now.
- * ---------------------------------------
- */
- int
- paradj_handler()
- {
- BlockNumber curpage;
- HeapTuple curtuple;
- ItemPointer tid;
- int groupid;
-
- SLAVE1_elog(DEBUG, "slave %d got SIGPARADJ", MyPid);
- if (SlaveInfoP[MyPid].isDone) {
- /* -----------------------
- * this means that the whole job is almost done
- * no adjustment to parallelism should be made
- * ------------------------
- */
- SlaveInfoP[MyPid].curpage = NOPARADJ;
- curpage = NOPARADJ;
- }
- else
- if (!SlaveLocalInfoD.isworking || SlaveLocalInfoD.heapscandesc == NULL) {
- if (SlaveInfoP[MyPid].isAddOnSlave) {
- SlaveInfoP[MyPid].curpage = SlaveLocalInfoD.startpage;
- curpage = SlaveLocalInfoD.startpage;
- }
- else {
- SlaveInfoP[MyPid].curpage = NULLPAGE;
- curpage = NULLPAGE;
- }
- }
- else {
- curtuple = SlaveLocalInfoD.heapscandesc->rs_ctup;
- tid = &(curtuple->t_ctid);
- if (ItemPointerIsValid(tid)) {
- curpage = ItemPointerGetBlockNumber(tid);
- }
- else {
- curpage = NULLPAGE;
- }
- SlaveInfoP[MyPid].curpage = curpage;
- }
- #ifdef HAS_TEST_AND_SET
- S_UNLOCK(&(SlaveInfoP[MyPid].lock));
- #endif
- SLAVE2_elog(DEBUG, "slave %d sending back curpage = %d", MyPid, curpage);
- groupid = SlaveInfoP[MyPid].groupId;
- MWaitOne(&(ProcGroupInfoP[groupid].m1lock));
- SLAVE1_elog(DEBUG, "slave %d complete handshaking with master", MyPid);
- if (ProcGroupInfoP[groupid].paradjpage == NOPARADJ) {
- /* ----------------------
- * this means that the master changed his/her mind
- * no adjustment to parallelism will be done
- * ----------------------
- */
- return;
- }
- SlaveLocalInfoD.paradjpending = true;
- SlaveLocalInfoD.paradjpage = ProcGroupInfoP[groupid].paradjpage;
- SlaveLocalInfoD.newparallel = ProcGroupInfoP[groupid].newparallel;
- return;
- }
-
- /* ------------------------------------
- * paradj_nextpage
- *
- * check if parallelism adjustment point is reached, if so
- * figure out and return the next page to scan.
- * XXX only works for heapscan right now.
- * -------------------------------------
- */
- int
- paradj_nextpage(page, dir)
- int page;
- int dir;
- {
- if (SlaveLocalInfoD.paradjpending) {
- if (page >= SlaveLocalInfoD.paradjpage) {
- SLAVE2_elog(DEBUG, "slave %d adjusting page skip to %d",
- MyPid, SlaveLocalInfoD.newparallel);
- if (SlaveLocalInfoD.newparallel >= SlaveLocalInfoD.nparallel ||
- SlaveInfoP[MyPid].groupPid < SlaveLocalInfoD.newparallel) {
- SlaveLocalInfoD.nparallel = SlaveLocalInfoD.newparallel;
- SlaveLocalInfoD.paradjpending = false;
- if (dir < 0)
- return
- SlaveLocalInfoD.paradjpage-SlaveInfoP[MyPid].groupPid;
- else
- return
- SlaveLocalInfoD.paradjpage+SlaveInfoP[MyPid].groupPid;
- }
- else {
- int groupid = SlaveInfoP[MyPid].groupId;
- Plan plan = QdGetPlan(QueryDesc);
- EState estate = (EState)get_state(plan);
- EndPlan(plan, estate);
- V_Finished(groupid, &(ProcGroupInfoP[groupid].dropoutcounter),
- PARADJPENDING);
- RestartForParAdj = true;
- SlaveRestart();
- }
- }
- }
- else
- return NULLPAGE;
- }
-